package com.dmp.report

import com.dmp.config.ConfigHandler
import com.google.gson.Gson
import com.oracle.beans.{ProCity2, RptArea}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}

object AreaAnalysisCore {
  def main(args: Array[String]): Unit = {
    if (args.length != 1) {
      println(
        """
          |Usage:
          | cn.sheep.violet.etl.Bz2Parquet
          | args:
          |     dataInputPath:  原始日志输入路径
        """.stripMargin)
      sys.exit(101) // 101: 参数不合法 $?
    }
    val Array(inpath) = args
    Logger.getLogger("org").setLevel(Level.WARN)
    val sparkConf = new SparkConf()
    sparkConf.setAppName("地域分布情况 MR版本")
    sparkConf.setMaster("local[*]")
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    val sc = new SparkContext(sparkConf)

    val sQLContext = new SQLContext(sc)
    //读取数据
    val dataFrame: DataFrame = sQLContext.read.parquet(inpath)

      import sQLContext.implicits._
    /**
      * 维度： 省市
      * 指标字段：
      */
    dataFrame.map(row => {

      // 提取维度字段
      val provinceName = row.getAs[String]("provincename")
      val cityName = row.getAs[String]("cityname")

      // 提取个指标相关的字段
      val reqMode = row.getAs[Int]("requestmode")
      val proNode = row.getAs[Int]("processnode")
      val effTive = row.getAs[Int]("iseffective")
      val billing = row.getAs[Int]("isbilling")
      val isbid = row.getAs[Int]("isbid")
      val iswin = row.getAs[Int]("iswin")
      val adOrderId = row.getAs[Int]("adorderid")

      val rawReq = if (reqMode == 1 && proNode >= 1) 1 else 0
      val effReq = if (reqMode == 1 && proNode >= 2) 1 else 0
      val adReq = if (reqMode == 1 && proNode == 3) 1 else 0

      val adRtbReq = if (effTive == 1 && billing == 1 && isbid == 1 && adOrderId != 0) 1 else 0
      val adSuccRtbAndCostAndConsumption = if (effTive == 1 && billing == 1 && iswin == 1) {
        val winPrice = row.getAs[Double]("winprice")
        val adPayment = row.getAs[Double]("adpayment")

        (1, adPayment / 1000, winPrice / 1000)

      } else (0, 0d, 0d)

      val adShow = if (reqMode == 2 && effTive == 1) 1 else 0
      val adClick = if (reqMode == 3 && effTive == 1) 1 else 0

      // 返回一个对偶元组
      ((provinceName, cityName), List(rawReq, effReq, adReq, adRtbReq, adSuccRtbAndCostAndConsumption._1, adShow, adClick, adSuccRtbAndCostAndConsumption._2, adSuccRtbAndCostAndConsumption._3))
    }).reduceByKey((a, b) => a.zip(b).map(t => t._1 + t._2))
      //            .reduceByKey((a, b) => (
      //            a._1 + b._1,
      //            a._2 + b._2,
      //            a._3 + b._3,
      //            a._4 + b._4,
      //            a._5 + b._5,
      //            a._6 + b._6,
      //            a._7 + b._7,
      //            a._8 + b._8,
      //            a._9 + b._9
      //        ))
      .map(tp =>RptArea(
      tp._1._1,
      tp._1._2,
      tp._2(0).toInt,
      tp._2(1).toInt,
      tp._2(2).toInt,
      tp._2(3).toInt,
      tp._2(4).toInt,
      tp._2(5).toInt,
      tp._2(6).toInt,
      tp._2(7),
      tp._2(8)
    )).toDF().write.mode(SaveMode.Overwrite).jdbc(ConfigHandler.url, "orc_report_area", ConfigHandler.prop)

    sc.stop()
  }
}
